Skip to content

[CLI/Core] Polymorphic agent dispatch + TestRun owns trial counter#893

Open
rutayan-nv wants to merge 5 commits into
NVIDIA:mainfrom
rutayan-nv:rpatro/custom-training-loop-dispatch
Open

[CLI/Core] Polymorphic agent dispatch + TestRun owns trial counter#893
rutayan-nv wants to merge 5 commits into
NVIDIA:mainfrom
rutayan-nv:rpatro/custom-training-loop-dispatch

Conversation

@rutayan-nv
Copy link
Copy Markdown
Contributor

@rutayan-nv rutayan-nv commented May 15, 2026

Summary

Replaces the original HAS_CUSTOM_TRAINING_LOOP flag + Protocol + TypeGuard + if/else dispatch with polymorphism on BaseAgent.run(), and centralizes ownership of TestRun.step so the trial counter has a single mutator.

Why this changed shape

The original design (flag + Protocol + helper + branched dispatcher) is a textbook type-tagged conditional. Live cluster runs surfaced two related bugs:

  1. test_run.step had no clear owner. The handler set it from outside, the gymnasium adapter rewound it on reset(), the reporter wrote it during recovery, the batch-unroll runner cloned it. With RLlib's contextual-bandit setup (agent_steps=1, frequent reset()s), the adapter collapsed every trial onto step=1, overwriting trajectory.csv and env.csv rows.

  2. Dispatch was branched on agent identity. "Use polymorphism instead of switching on type" (Fowler) is the canonical fix.

This PR addresses both at once: TestRun.increment_step() becomes the single mutator (CloudAIGymEnv.step() its only caller), and BaseAgent.run() becomes the single dispatch entry point that agents override.

Commits

  1. feat(core): TestRun owns trial counter; CloudAIGymEnv.step() is sole mutator — adds TestRun.increment_step(), calls it at the top of CloudAIGymEnv.step(), removes the competing env.test_run.step = step writer from handle_dse_job. New TestIncrementStep test class covers the API; test_cloudaigym asserts step is advanced before output_path / trajectory rows are computed.

  2. refactor(cli): collapse custom-loop dispatch to BaseAgent.run() polymorphism — adds BaseAgent.run() -> int default holding the step-loop body; handle_dse_job collapses to err |= agent.run(). Deletes HAS_CUSTOM_TRAINING_LOOP, CustomTrainingLoopAgent Protocol, _has_custom_training_loop TypeGuard, _run_custom_training_loop helper. Net -89 lines on cloudai; surface area shrinks (no Protocol, no TypeGuard, no flag).

Agents that drive their own training (RLlib-based PPO/DQN in cloudaix) override run() to call their own algo.train() and return a process-style rc. The handler doesn't know that category exists.

Why not a separate "step ownership" PR?

The step-ownership bug only surfaces under the custom-training-loop path (RLlib's reset() pattern is what collapses the trial counter). Fixing it as a precondition of the dispatch refactor keeps the two changes co-located and ensures the polymorphic dispatch lands on a step-counter contract that actually holds.

Related

Test plan

  • pytest tests — 1444 passed
  • ruff check src tests — clean
  • Downstream cloudaix RL workloads (PPO + DQN) reinstalled against this branch — full suite green; new contract tests in cloudaix (test_gymnasium_adapter_contract, test_env_csv_adapter_contract) confirm tr.step monotonicity end-to-end through the adapter and the user-visible env.csv artifact.
  • Cluster smoke test (ilyk MRC PRT RL run) — will validate env.csv / trajectory.csv have unique monotonic step values, no row overwrites.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 15, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Agent execution was centralized into BaseAgent.run and handlers now call agent.run(); CloudAIGymEnv.step advances TestRun.step before applying params; TestRun gained increment_step(); tests and a stub agent were added to validate run() delegation and step-indexed artifacts.

Changes

Agent-run integration and step semantics

Layer / File(s) Summary
TestRun.increment_step and env step ordering
src/cloudai/_core/test_scenario.py, src/cloudai/configurator/cloudai_gym.py, tests/test_cloudaigym.py, tests/test_test_scenario.py
Adds TestRun.increment_step() and makes CloudAIGymEnv.step() call it before apply_params_set; tests updated to assert post-advance trial indices in output paths and cached trajectory rows.
BaseAgent.run implementation
src/cloudai/configurator/base_agent.py
Adds logging import and a concrete BaseAgent.run() loop that calls select_action(), env.step(action), update_policy(...), logs observations/rewards, and returns 0 on completion.
DSE handler change and handler tests
src/cloudai/cli/handlers.py, tests/test_handlers.py
handle_dse_job now calls agent.run() and ORs its integer return into the aggregated error instead of performing the per-step select_action/env.step/update_policy loop; tests add a CustomRunStubAgent, fixture, and assertions that run() is invoked and non-zero return values propagate.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 I hop through runs and count the beats,

I step before I write the feats,
One run to rule the agent's day,
Return a code, then hop away,

🚥 Pre-merge checks | ✅ 4
✅ Passed checks (4 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Title check ✅ Passed The title accurately captures the main design change: replacing a flag-based dispatch pattern with polymorphism on BaseAgent.run() and centralizing the trial counter in TestRun.
Description check ✅ Passed The description comprehensively explains both the refactoring (type-tagged conditional to polymorphism) and the bug fix (centralized step ownership), relating directly to all changeset modifications.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/cloudai/cli/handlers.py`:
- Around line 140-152: The finally block in _run_custom_training_loop currently
calls shutdown() directly which can raise and override the earlier return value;
wrap the shutdown invocation (getattr(agent, "shutdown", None) and the callable
check) in its own try/except Exception handler so any exceptions from shutdown
are caught and logged via logging.exception (include agent_type) and not
re-raised, ensuring the original return 0/1 from agent.train() is preserved.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Enterprise

Run ID: 21ef8346-6b78-45d3-8a1c-29a3c393e440

📥 Commits

Reviewing files that changed from the base of the PR and between 4bdc465 and 3b62ddc.

📒 Files selected for processing (2)
  • src/cloudai/cli/handlers.py
  • tests/test_handlers.py

Comment thread src/cloudai/cli/handlers.py Outdated
This was referenced May 15, 2026
@rutayan-nv rutayan-nv changed the title feat(cli): support agents with custom training loops in handle_dse_job [CLI] Support agents with custom training loops in handle_dse_job May 15, 2026
- Agents that set HAS_CUSTOM_TRAINING_LOOP = True drive their own training loop;
  handle_dse_job calls agent.train() and skips the per-step env.step loop.
- New _run_custom_training_loop helper logs exceptions, returns a process-style
  exit code, and always invokes agent.shutdown() (when defined) in a finally
  block so resources are released on both success and failure paths.
- CustomTrainingLoopAgent Protocol documents the opt-in contract for type
  checkers and IDEs.
Pyright rejected calling _run_custom_training_loop(agent, ...) because the
plain bool predicate did not narrow agent's static type from BaseAgent to
CustomTrainingLoopAgent. Return TypeGuard[CustomTrainingLoopAgent] from
_has_custom_training_loop so the truthy branch in handle_dse_job sees the
opted-in shape and the helper can call agent.train() directly.
If agent.shutdown() raised from the finally block, Python suppressed the
earlier return 0/1 from agent.train() and propagated the exception, breaking
the outer test-run loop in handle_dse_job (skipped remaining scenarios,
failed to accumulate err |= rc). Wrap shutdown() in its own try/except,
log via logging.exception, set rc = 1, and return rc after finally so the
helper always honours the (int) -> int contract.

Adds tests for shutdown-only failure and combined train+shutdown failure.
@rutayan-nv rutayan-nv force-pushed the rpatro/custom-training-loop-dispatch branch from 3ffe893 to 9552e5a Compare May 18, 2026 16:33
Comment thread src/cloudai/cli/handlers.py Outdated
return installables, installer


@runtime_checkable
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move this code into base_agent.py. handlers.py is already too long

as for the tests against _run_custom_training_loop: I'm starting to make the tests folder structure replicate the main code structure. so in this case, I'd place all the relevant tests you added into tests/configurator/test_base_agent.py

(not related to tests against handle_dse_job)

Comment thread src/cloudai/cli/handlers.py Outdated
agent = agent_class(env, agent_config)

if _has_custom_training_loop(agent):
err |= _run_custom_training_loop(agent, agent_type)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we exit (immediate return err) if err is greater than zero? The existing code above doesn't really treat the err well but maybe it's the time to start doing so :D

…mutator

Previously, ``test_run.step`` had no clear owner: the dispatcher set it from
outside, the adapter rewound it on ``reset()``, and other callers wrote to
it ad hoc. In RLlib custom-loop runs this collapsed every trial onto
``step=1``, overwriting ``trajectory.csv`` and ``env.csv`` rows.

Centralize the invariant: ``TestRun.increment_step()`` is the single named
mutator, and ``CloudAIGymEnv.step()`` is its only caller. One ``env.step()``
call advances the trial counter by exactly one — independent of any episode
or dispatcher concept above the gym env.

Contract tests in ``TestIncrementStep`` cover the API; ``test_cloudaigym``
asserts ``step`` is advanced *before* ``output_path`` and trajectory rows
are computed, so cached and live trials both record the post-increment value.
…orphism

Earlier commits in this PR introduced ``HAS_CUSTOM_TRAINING_LOOP`` + a
``CustomTrainingLoopAgent`` Protocol + a TypeGuard helper + an ``if/else``
in ``handle_dse_job`` to switch between the cloudai step loop and an
agent-owned ``train()`` loop. That is a type-tagged conditional dispatching
on agent identity — the textbook signal to replace conditional with
polymorphism (Fowler).

Add a default ``BaseAgent.run() -> int`` that holds the step-loop body
(``select_action`` / ``env.step`` / ``update_policy`` per trial). Agents
that drive their own training (RLlib, etc.) override ``run()`` to delegate
to whatever loop they own and return a process-style exit code.

``handle_dse_job`` collapses to ``err |= agent.run()`` — one line, no
branching, no Protocol vocabulary. The handler no longer knows that
"custom training loops" exist as a category; that's an agent implementation
detail.

Net: -89 lines on cloudai. Surface area shrinks (no Protocol, no TypeGuard,
no flag). ``test_handlers`` replaces the 5 helper unit tests + 2 dispatcher
integration tests with 2 polymorphic tests asserting ``handle_dse_job``
delegates to ``agent.run()`` and propagates its return code.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/cloudai/cli/handlers.py`:
- Line 160: handle_dse_job currently calls agent.run() directly which can throw
and abort the whole DSE batch; wrap the call to agent.run() in a try/except that
catches any exception, sets/updates the existing err variable to a non-zero
return code (e.g., err |= 1 or err = 1) and continues processing remaining runs
instead of re-raising; ensure the catch logs the exception (including agent
identity) for debugging and references the agent.run() call and err variable so
the change is applied in the handle_dse_job function.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Enterprise

Run ID: 843e393f-7ffd-4c66-ba6b-cd08d22ab804

📥 Commits

Reviewing files that changed from the base of the PR and between 9552e5a and a1a268a.

📒 Files selected for processing (7)
  • src/cloudai/_core/test_scenario.py
  • src/cloudai/cli/handlers.py
  • src/cloudai/configurator/base_agent.py
  • src/cloudai/configurator/cloudai_gym.py
  • tests/test_cloudaigym.py
  • tests/test_handlers.py
  • tests/test_test_scenario.py

feedback = {"trial_index": step, "value": reward}
agent.update_policy(feedback)
logging.info(f"Step {step}: Observation: {[round(obs, 4) for obs in observation]}, Reward: {reward:.4f}")
err |= agent.run()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Guard agent.run() failures to preserve DSE batch execution.

Line 160 can raise out of handle_dse_job if an agent throws, which aborts remaining test runs and bypasses intended err accumulation. Convert exceptions into a non-zero rc and continue.

Suggested patch
-        err |= agent.run()
+        try:
+            err |= agent.run()
+        except Exception:
+            logging.exception("Agent %s failed during run().", agent_type)
+            err |= 1
+            continue
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cloudai/cli/handlers.py` at line 160, handle_dse_job currently calls
agent.run() directly which can throw and abort the whole DSE batch; wrap the
call to agent.run() in a try/except that catches any exception, sets/updates the
existing err variable to a non-zero return code (e.g., err |= 1 or err = 1) and
continues processing remaining runs instead of re-raising; ensure the catch logs
the exception (including agent identity) for debugging and references the
agent.run() call and err variable so the change is applied in the handle_dse_job
function.

@rutayan-nv rutayan-nv changed the title [CLI] Support agents with custom training loops in handle_dse_job [CLI/Core] Polymorphic agent dispatch + TestRun owns trial counter May 23, 2026
@rutayan-nv
Copy link
Copy Markdown
Contributor Author

Downstream cloudaix PR that consumes this contract: https://github.com/Mellanox/cloudaix/pull/589

It ports GymnasiumAdapter to cloudaix (where its only consumer — the RL agent stack — lives), introduces a rllib_run dispatch helper for shared RLlib orchestration code, and refactors PPO/DQN onto the new BaseAgent.run() polymorphism. The cloudaix PR also pins the user-visible env.csv monotonicity contract — that's the test that would have caught the original cluster-run bug at unit-test time.

Together the two PRs are the architectural fix for the trial-counter collapse: cloudai owns the trial-counter contract (TestRun.increment_step() + CloudAIGymEnv.step() as sole mutator), cloudaix owns the RL-specific glue (adapter + dispatch helper + agent overrides).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants